package com.example.demo.entity;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.demo.env.SpringContextHolder;
import com.example.demo.mapper.EmployeePoMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.List;

public class EmployeeRowToPo extends RichSourceFunction<EmployeePo> {


    private EmployeePo employeePo;
    private EmployeePoMapper employeePoMapper;
//    private PreparedStatement ps = null;
//    private Connection connection = null;
//    String url = "jdbc:mysql://127.0.0.1:3306/sakila?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&autoReconnect=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true";
//    String username = "root";
//    String password = "root";

    public EmployeeRowToPo(EmployeePo employeePo) {
        this.employeePo = employeePo;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        employeePoMapper = SpringContextHolder.getBean(EmployeePoMapper.class);

//        connection = getConnection();
//        String sql = "select * from employee";
//        //获取执行语句
//        ps = connection.prepareStatement(sql);
    }

    @Override
    public void run(SourceContext<EmployeePo> sourceContext) {
        LambdaQueryWrapper<EmployeePo> queryWrapper = Wrappers.lambdaQuery();
        queryWrapper.eq(StringUtils.isNotBlank(employeePo.getUserName()), EmployeePo::getUserName, employeePo.getUserName());
        List<EmployeePo> employeePos = employeePoMapper.selectList(queryWrapper);
        if (CollectionUtils.isNotEmpty(employeePos)) {
            for (EmployeePo employeePo : employeePos) {
                sourceContext.collect(employeePo);
            }
        }
//        ResultSet resultSet = ps.executeQuery();
//        while (resultSet.next()) {
//            EmployeePo Po = new EmployeePo();
//            Po.setId(resultSet.getLong("id"));
//            Po.setUserName(resultSet.getString("user_name"));
//            Po.setGender(resultSet.getString("gender"));
//            Po.setAge(resultSet.getInt("age"));
//            if(Objects.nonNull(resultSet.getDate("birthday"))) {
//                String birthday1 = resultSet.getString("birthday");
//                Po.setBirthday(LocalDateTime.parse(birthday1, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
//            }
//            Po.setMaritalStatus(resultSet.getString("marital_status"));
//            Po.setEducation(resultSet.getString("education"));
//            Po.setBloodType(resultSet.getString("blood_type"));
//            Po.setMobile(resultSet.getString("mobile"));
//            Po.setDepartmentName(resultSet.getString("department_name"));
//            Po.setNationalArea(resultSet.getString("national_area"));
//            Po.setProvince(resultSet.getString("province"));
//            Po.setCity(resultSet.getString("city"));
//            Po.setIdCardNumber(resultSet.getString("id_card_number"));
//            Po.setPersonalMailBox(resultSet.getString("personal_mail_box"));
//            // 发送结果
//            sourceContext.collect(Po);
//        }
    }

    @Override
    public void cancel() {

    }

//    @Override
//    public void close() throws Exception {
//        super.close();
//
//        if (connection != null) {
//            connection.close();
//        }
//        if (ps != null) {
//            ps.close();
//        }
//    }
//
//    //获取mysql连接配置
//    public Connection getConnection() {
//        try {
//            // 加载驱动
//            // Class.forName(driver);
//            //创建连接
//            connection = DriverManager.getConnection(url, username, password);
//        } catch (Exception e) {
//            System.out.println("********mysql get connection occur exception, msg = " + e.getMessage());
//            e.printStackTrace();
//        }
//        return connection;
//    }

}
